import { EventEmitter } from "./EventEmitter";

interface QueueConfig {
	autoStart?: boolean; // 自动开始
	workers?: any[]; // 任务列表
	concurrency?: number; //并发数量
}

export class Queue extends EventEmitter {
	public workers: any[] = []; // 任务列表
	public workersCache: any[] = []; // 保存原来的任务列表
	private pending: number = 0; // 进行中的任务数量
	private autoStart: boolean = false; // 是否自动开始
	private isRuning: boolean = false; //是否执行中
	private isAborted: boolean = false; //是否中止了
	private concurrency: number = Infinity; //并发数量

	constructor(config: QueueConfig = { autoStart: false, workers: [], concurrency: Infinity }) {
		super();
		this.autoStart = config?.autoStart ?? false; // 是否自动开始
		this.concurrency = config?.concurrency ?? Infinity; //并发数量
		this.push(...(config?.workers ?? []));
	}
	/**
	 * 获取执行中和队列中的数量，已执行完的不计算在内
	 */
	get length() {
		return this.workers.length + this.pending;
	}

	/**
	 * 添加队列任务
	 * @param  {...any} workers 任务列表
	 */
	public push(...workers: any[]) {
		this.workers.push(...workers);
		this.workersCache.push(...workers);

		if (this.autoStart) {
			this.start();
		}
	}
	/**
	 * 执行下一任务
	 * @param {*} worker
	 * @returns
	 */
	private next(worker: any = null) {
		if (this.isAborted) {
			return false;
		}
		if (!worker) {
			if (this.length <= 0) {
				this.isRuning = false;
				this.emit("end", null);
				return false;
			}
			if (this.workers.length <= 0) {
				return false;
			}
		}
		this.pending++;
		let _worker = worker ?? this.workers.shift();
		Promise.resolve(typeof _worker == "function" ? _worker() : _worker)
			.then((res) => {
				!this.isAborted && this.emit("success", res);
			})
			.catch((error) => {
				!this.isAborted && this.emit("error", error, _worker);
			})
			.finally(() => {
				this.pending--;
				!this.isAborted && this.next();
			});
	}

	/**
	 * 开始任务
	 */
	public start() {
		if (this.isAborted) {
			// 如果之前已经中止过，则重新取之前的全部队列，重新执行
			this.workers = [...this.workersCache];
		}
		this.isAborted = false;
		if (!this.isRuning) {
			this.emit("start");
			this.isRuning = true;
		}
		while (this.concurrency > this.pending && this.workers.length > 0 && !this.isAborted) {
			this.next();
		}
	}

	/**
	 * 中止任务
	 */
	public abort() {
		this.isAborted = true;
		if (this.isRuning) {
			this.isRuning = false;
			this.emit("end", new Error("The queue is aborted !"));
		}
	}
	public end() {
		this.abort();
	}
}
